/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.fn.harness;



import com.google.auto.service.AutoService;

import com.bff.gaia.unified.fn.harness.DoFnPTransformRunnerFactory.Context;

import com.bff.gaia.unified.fn.harness.state.FnApiStateAccessor;

import com.bff.gaia.unified.runners.core.DoFnRunner;

import com.bff.gaia.unified.runners.core.LateDataUtils;

import com.bff.gaia.unified.runners.core.construction.PTransformTranslation;

import com.bff.gaia.unified.runners.core.construction.ParDoTranslation;

import com.bff.gaia.unified.runners.core.construction.Timer;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.fn.data.FnDataReceiver;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.state.State;

import com.bff.gaia.unified.sdk.state.StateSpec;

import com.bff.gaia.unified.sdk.state.TimeDomain;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.DoFn.MultiOutputReceiver;

import com.bff.gaia.unified.sdk.transforms.DoFn.OutputReceiver;

import com.bff.gaia.unified.sdk.transforms.DoFnOutputReceivers;

import com.bff.gaia.unified.sdk.transforms.DoFnSchemaInformation;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnInvoker;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnInvokers;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnSignature.StateDeclaration;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnSignatures;

import com.bff.gaia.unified.sdk.transforms.splittabledofn.RestrictionTracker;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.PaneInfo;

import com.bff.gaia.unified.sdk.util.UserCodeException;

import com.bff.gaia.unified.sdk.util.WindowedValue;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollectionView;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TupleTag;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableMap;

import org.joda.time.DateTimeUtils;

import org.joda.time.Duration;

import org.joda.time.Instant;



import java.io.IOException;

import java.util.Collection;

import java.util.Iterator;

import java.util.Map;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.*;



/**

 * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers of

 * abstraction caused by StateInternals/TimerInternals since they model state and timer concepts

 * differently.

 */

public class FnApiDoFnRunner<InputT, OutputT>

    implements DoFnPTransformRunnerFactory.DoFnPTransformRunner<InputT> {

  /** A registrar which provides a factory to handle Java {@link DoFn}s. */

  @AutoService(PTransformRunnerFactory.Registrar.class)

  public static class Registrar implements PTransformRunnerFactory.Registrar {

    @Override

    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {

      return ImmutableMap.of(PTransformTranslation.PAR_DO_TRANSFORM_URN, new Factory());

    }

  }



  static class Factory<InputT, OutputT>

      extends DoFnPTransformRunnerFactory<

          InputT, InputT, OutputT, FnApiDoFnRunner<InputT, OutputT>> {

    @Override

    public FnApiDoFnRunner<InputT, OutputT> createRunner(Context<InputT, OutputT> context) {

      return new FnApiDoFnRunner<>(context);

    }

  }



  //////////////////////////////////////////////////////////////////////////////////////////////////



  private final Context<InputT, OutputT> context;

  private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers;

  private FnApiStateAccessor stateAccessor;

  private final DoFnInvoker<InputT, OutputT> doFnInvoker;

  private final DoFn<InputT, OutputT>.StartBundleContext startBundleContext;

  private final ProcessBundleContext processContext;

  private final OnTimerContext onTimerContext;

  private final DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext;



  /** Only valid during {@link #processElement}, null otherwise. */

  private WindowedValue<InputT> currentElement;



  /** Only valid during {@link #processElement} and {@link #processTimer}, null otherwise. */

  private BoundedWindow currentWindow;



  /** Only valid during {@link #processTimer}, null otherwise. */

  private WindowedValue<KV<Object, Timer>> currentTimer;



  /** Only valid during {@link #processTimer}, null otherwise. */

  private TimeDomain currentTimeDomain;



  private DoFnSchemaInformation doFnSchemaInformation;



  FnApiDoFnRunner(Context<InputT, OutputT> context) {

    this.context = context;



    this.mainOutputConsumers =

        (Collection<FnDataReceiver<WindowedValue<OutputT>>>)

            (Collection) context.localNameToConsumer.get(context.mainOutputTag.getId());

    this.doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.parDoPayload);

    this.doFnInvoker = DoFnInvokers.invokerFor(context.doFn);

    this.doFnInvoker.invokeSetup();



    this.startBundleContext =

        this.context.doFn.new StartBundleContext() {

          @Override

          public PipelineOptions getPipelineOptions() {

            return context.pipelineOptions;

          }

        };

    this.processContext = new ProcessBundleContext();

    this.onTimerContext = new OnTimerContext();

    this.finishBundleContext =

        this.context.doFn.new FinishBundleContext() {

          @Override

          public PipelineOptions getPipelineOptions() {

            return context.pipelineOptions;

          }



          @Override

          public void output(OutputT output, Instant timestamp, BoundedWindow window) {

            outputTo(

                mainOutputConsumers,

                WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));

          }



          @Override

          public <T> void output(

			  TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {

            Collection<FnDataReceiver<WindowedValue<T>>> consumers =

                (Collection) context.localNameToConsumer.get(tag.getId());

            if (consumers == null) {

              throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));

            }

            outputTo(consumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));

          }

        };

  }



  @Override

  public void startBundle() {

    this.stateAccessor =

        new FnApiStateAccessor(

            context.pipelineOptions,

            context.ptransformId,

            context.processBundleInstructionId,

            context.tagToSideInputSpecMap,

            context.unifiedFnStateClient,

            context.keyCoder,

            (Coder<BoundedWindow>) context.windowCoder,

            () -> MoreObjects.firstNonNull(currentElement, currentTimer),

            () -> currentWindow);



    doFnInvoker.invokeStartBundle(startBundleContext);

  }



  @Override

  public void processElement(WindowedValue<InputT> elem) {

    currentElement = elem;

    try {

      Iterator<BoundedWindow> windowIterator =

          (Iterator<BoundedWindow>) elem.getWindows().iterator();

      while (windowIterator.hasNext()) {

        currentWindow = windowIterator.next();

        doFnInvoker.invokeProcessElement(processContext);

      }

    } finally {

      currentElement = null;

      currentWindow = null;

    }

  }



  @Override

  public void processTimer(

	  String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> timer) {

    currentTimer = timer;

    currentTimeDomain = timeDomain;

    try {

      Iterator<BoundedWindow> windowIterator =

          (Iterator<BoundedWindow>) timer.getWindows().iterator();

      while (windowIterator.hasNext()) {

        currentWindow = windowIterator.next();

        doFnInvoker.invokeOnTimer(timerId, onTimerContext);

      }

    } finally {

      currentTimer = null;

      currentTimeDomain = null;

      currentWindow = null;

    }

  }



  @Override

  public void finishBundle() {

    doFnInvoker.invokeFinishBundle(finishBundleContext);



    // TODO: Support caching state data across bundle boundaries.

    this.stateAccessor.finalizeState();

    this.stateAccessor = null;

  }



  /** Outputs the given element to the specified set of consumers wrapping any exceptions. */

  private <T> void outputTo(

	  Collection<FnDataReceiver<WindowedValue<T>>> consumers, WindowedValue<T> output) {

    try {

      for (FnDataReceiver<WindowedValue<T>> consumer : consumers) {

        consumer.accept(output);

      }

    } catch (Throwable t) {

      throw UserCodeException.wrap(t);

    }

  }



  private class FnApiTimer implements com.bff.gaia.unified.sdk.state.Timer {

    private final String timerId;

    private final TimeDomain timeDomain;

    private final Instant currentTimestamp;

    private final Duration allowedLateness;

    private final WindowedValue<?> currentElementOrTimer;



    private Duration period = Duration.ZERO;

    private Duration offset = Duration.ZERO;



    FnApiTimer(String timerId, WindowedValue<KV<?, ?>> currentElementOrTimer) {

      this.timerId = timerId;

      this.currentElementOrTimer = currentElementOrTimer;



      TimerDeclaration timerDeclaration = context.doFnSignature.timerDeclarations().get(timerId);

      this.timeDomain =

          DoFnSignatures.getTimerSpecOrThrow(timerDeclaration, context.doFn).getTimeDomain();



      switch (timeDomain) {

        case EVENT_TIME:

          this.currentTimestamp = currentElementOrTimer.getTimestamp();

          break;

        case PROCESSING_TIME:

          this.currentTimestamp = new Instant(DateTimeUtils.currentTimeMillis());

          break;

        case SYNCHRONIZED_PROCESSING_TIME:

          this.currentTimestamp = new Instant(DateTimeUtils.currentTimeMillis());

          break;

        default:

          throw new IllegalArgumentException(String.format("Unknown time domain %s", timeDomain));

      }



      try {

        this.allowedLateness =

            context

                .rehydratedComponents

                .getPCollection(context.pTransform.getInputsOrThrow(timerId))

                .getWindowingStrategy()

                .getAllowedLateness();

      } catch (IOException e) {

        throw new IllegalArgumentException(

            String.format("Unable to get allowed lateness for timer %s", timerId));

      }

    }



    @Override

    public void set(Instant absoluteTime) {

      // Verifies that the time domain of this timer is acceptable for absolute timers.

      if (!TimeDomain.EVENT_TIME.equals(timeDomain)) {

        throw new IllegalArgumentException(

            "Can only set relative timers in processing time domain. Use #setRelative()");

      }



      // Ensures that the target time is reasonable. For event time timers this means that the time

      // should be prior to window GC time.

      if (TimeDomain.EVENT_TIME.equals(timeDomain)) {

        Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);

        checkArgument(

            !absoluteTime.isAfter(windowExpiry),

            "Attempted to set event time timer for %s but that is after"

                + " the expiration of window %s",

            absoluteTime,

            windowExpiry);

      }



      output(absoluteTime);

    }



    @Override

    public void setRelative() {

      Instant target;

      if (period.equals(Duration.ZERO)) {

        target = currentTimestamp.plus(offset);

      } else {

        long millisSinceStart = currentTimestamp.plus(offset).getMillis() % period.getMillis();

        target =

            millisSinceStart == 0

                ? currentTimestamp

                : currentTimestamp.plus(period).minus(millisSinceStart);

      }

      target = minTargetAndGcTime(target);

      output(target);

    }



    @Override

    public com.bff.gaia.unified.sdk.state.Timer offset(Duration offset) {

      this.offset = offset;

      return this;

    }



    @Override

    public com.bff.gaia.unified.sdk.state.Timer align(Duration period) {

      this.period = period;

      return this;

    }



    /**

     * For event time timers the target time should be prior to window GC time. So it returns

     * min(time to set, GC Time of window).

     */

    private Instant minTargetAndGcTime(Instant target) {

      if (TimeDomain.EVENT_TIME.equals(timeDomain)) {

        Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);

        if (target.isAfter(windowExpiry)) {

          return windowExpiry;

        }

      }

      return target;

    }



    private void output(Instant scheduledTime) {

      Object key = ((KV) currentElementOrTimer.getValue()).getKey();

      Collection<FnDataReceiver<WindowedValue<KV<Object, Timer>>>> consumers =

          (Collection) context.localNameToConsumer.get(timerId);



      outputTo(consumers, currentElementOrTimer.withValue(KV.of(key, Timer.of(scheduledTime))));

    }

  }



  /**

   * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.ProcessElement @ProcessElement}.

   */

  private class ProcessBundleContext extends DoFn<InputT, OutputT>.ProcessContext

      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {



    private ProcessBundleContext() {

      context.doFn.super();

    }



    @Override

    public BoundedWindow window() {

      return currentWindow;

    }



    @Override

    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {

      return pane();

    }



    @Override

    public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException(

          "Cannot access StartBundleContext outside of @StartBundle method.");

    }



    @Override

    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(

        DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException(

          "Cannot access FinishBundleContext outside of @FinishBundle method.");

    }



    @Override

    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {

      return this;

    }



    @Override

    public InputT element(DoFn<InputT, OutputT> doFn) {

      return element();

    }



    @Override

    public Object schemaElement(int index) {

      SerializableFunction converter = doFnSchemaInformation.getElementConverters().get(index);

      return converter.apply(element());

    }



    @Override

    public Instant timestamp(DoFn<InputT, OutputT> doFn) {

      return timestamp();

    }



    @Override

    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException(

          "Cannot access time domain outside of @ProcessTimer method.");

    }



    @Override

    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {

      return DoFnOutputReceivers.windowedReceiver(this, null);

    }



    @Override

    public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {

      return DoFnOutputReceivers.rowReceiver(this, null, context.mainOutputSchemaCoder);

    }



    @Override

    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {

      return DoFnOutputReceivers.windowedMultiReceiver(this, context.outputCoders);

    }



    @Override

    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException(

          "Cannot access OnTimerContext outside of @OnTimer methods.");

    }



    @Override

    public RestrictionTracker<?, ?> restrictionTracker() {

      throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");

    }



    @Override

    public State state(String stateId) {

      StateDeclaration stateDeclaration = context.doFnSignature.stateDeclarations().get(stateId);

      checkNotNull(stateDeclaration, "No state declaration found for %s", stateId);

      StateSpec<?> spec;

      try {

        spec = (StateSpec<?>) stateDeclaration.field().get(context.doFn);

      } catch (IllegalAccessException e) {

        throw new RuntimeException(e);

      }

      return spec.bind(stateId, stateAccessor);

    }



    @Override

    public com.bff.gaia.unified.sdk.state.Timer timer(String timerId) {

      checkState(

          currentElement.getValue() instanceof KV,

          "Accessing timer in unkeyed context. Current element is not a KV: %s.",

          currentElement.getValue());



      return new FnApiTimer(timerId, (WindowedValue) currentElement);

    }



    @Override

    public PipelineOptions getPipelineOptions() {

      return context.pipelineOptions;

    }



    @Override

    public PipelineOptions pipelineOptions() {

      return context.pipelineOptions;

    }



    @Override

    public void output(OutputT output) {

      outputTo(

          mainOutputConsumers,

          WindowedValue.of(

              output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));

    }



    @Override

    public void outputWithTimestamp(OutputT output, Instant timestamp) {

      outputTo(

          mainOutputConsumers,

          WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));

    }



    @Override

    public <T> void output(TupleTag<T> tag, T output) {

      Collection<FnDataReceiver<WindowedValue<T>>> consumers =

          (Collection) context.localNameToConsumer.get(tag.getId());

      if (consumers == null) {

        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));

      }

      outputTo(

          consumers,

          WindowedValue.of(

              output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));

    }



    @Override

    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {

      Collection<FnDataReceiver<WindowedValue<T>>> consumers =

          (Collection) context.localNameToConsumer.get(tag.getId());

      if (consumers == null) {

        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));

      }

      outputTo(

          consumers, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));

    }



    @Override

    public InputT element() {

      return currentElement.getValue();

    }



    @Override

    public <T> T sideInput(PCollectionView<T> view) {

      return stateAccessor.get(view, currentWindow);

    }



    @Override

    public Instant timestamp() {

      return currentElement.getTimestamp();

    }



    @Override

    public PaneInfo pane() {

      return currentElement.getPane();

    }



    @Override

    public void updateWatermark(Instant watermark) {

      throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn");

    }

  }



  /** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer @OnTimer}. */

  private class OnTimerContext extends DoFn<InputT, OutputT>.OnTimerContext

      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {



    private OnTimerContext() {

      context.doFn.super();

    }



    @Override

    public BoundedWindow window() {

      return currentWindow;

    }



    @Override

    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException(

          "Cannot access paneInfo outside of @ProcessElement methods.");

    }



    @Override

    public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException(

          "Cannot access StartBundleContext outside of @StartBundle method.");

    }



    @Override

    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(

        DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException(

          "Cannot access FinishBundleContext outside of @FinishBundle method.");

    }



    @Override

    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException(

          "Cannot access ProcessContext outside of @ProcessElement method.");

    }



    @Override

    public InputT element(DoFn<InputT, OutputT> doFn) {

      throw new UnsupportedOperationException("Element parameters are not supported.");

    }



    @Override

    public Object schemaElement(int index) {

      throw new UnsupportedOperationException("Element parameters are not supported.");

    }



    @Override

    public Instant timestamp(DoFn<InputT, OutputT> doFn) {

      return timestamp();

    }



    @Override

    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {

      return timeDomain();

    }



    @Override

    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {

      return DoFnOutputReceivers.windowedReceiver(this, null);

    }



    @Override

    public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {

      return DoFnOutputReceivers.rowReceiver(this, null, context.mainOutputSchemaCoder);

    }



    @Override

    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {

      return DoFnOutputReceivers.windowedMultiReceiver(this);

    }



    @Override

    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {

      return this;

    }



    @Override

    public RestrictionTracker<?, ?> restrictionTracker() {

      throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");

    }



    @Override

    public State state(String stateId) {

      StateDeclaration stateDeclaration = context.doFnSignature.stateDeclarations().get(stateId);

      checkNotNull(stateDeclaration, "No state declaration found for %s", stateId);

      StateSpec<?> spec;

      try {

        spec = (StateSpec<?>) stateDeclaration.field().get(context.doFn);

      } catch (IllegalAccessException e) {

        throw new RuntimeException(e);

      }

      return spec.bind(stateId, stateAccessor);

    }



    @Override

    public com.bff.gaia.unified.sdk.state.Timer timer(String timerId) {

      checkState(

          currentTimer.getValue() instanceof KV,

          "Accessing timer in unkeyed context. Current timer is not a KV: %s.",

          currentTimer);



      return new FnApiTimer(timerId, (WindowedValue) currentTimer);

    }



    @Override

    public PipelineOptions getPipelineOptions() {

      return context.pipelineOptions;

    }



    @Override

    public PipelineOptions pipelineOptions() {

      return context.pipelineOptions;

    }



    @Override

    public void output(OutputT output) {

      outputTo(

          mainOutputConsumers,

          WindowedValue.of(output, currentTimer.getTimestamp(), currentWindow, PaneInfo.NO_FIRING));

    }



    @Override

    public void outputWithTimestamp(OutputT output, Instant timestamp) {

      checkArgument(

          !currentTimer.getTimestamp().isAfter(timestamp),

          "Output time %s can not be before timer timestamp %s.",

          timestamp,

          currentTimer.getTimestamp());

      outputTo(

          mainOutputConsumers,

          WindowedValue.of(output, timestamp, currentWindow, PaneInfo.NO_FIRING));

    }



    @Override

    public <T> void output(TupleTag<T> tag, T output) {

      Collection<FnDataReceiver<WindowedValue<T>>> consumers =

          (Collection) context.localNameToConsumer.get(tag.getId());

      if (consumers == null) {

        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));

      }

      outputTo(

          consumers,

          WindowedValue.of(output, currentTimer.getTimestamp(), currentWindow, PaneInfo.NO_FIRING));

    }



    @Override

    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {

      checkArgument(

          !currentTimer.getTimestamp().isAfter(timestamp),

          "Output time %s can not be before timer timestamp %s.",

          timestamp,

          currentTimer.getTimestamp());

      Collection<FnDataReceiver<WindowedValue<T>>> consumers =

          (Collection) context.localNameToConsumer.get(tag.getId());

      if (consumers == null) {

        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));

      }

      outputTo(consumers, WindowedValue.of(output, timestamp, currentWindow, PaneInfo.NO_FIRING));

    }



    @Override

    public TimeDomain timeDomain() {

      return currentTimeDomain;

    }



    @Override

    public Instant timestamp() {

      return currentTimer.getTimestamp();

    }

  }

}